home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
SPACE 2
/
SPACE - Library 2 - Volume 1.iso
/
apps
/
453
/
rtxqio.c
< prev
next >
Wrap
C/C++ Source or Header
|
1990-02-16
|
10KB
|
403 lines
#include <stdio.h>
#include <osbind.h>
#include <tosdefs.h>
#include <rtxbind.h>
#include "rtxqio.h"
/*
* RTXQIO - Queued (Asynchronous) I/O Library
*
* Description:
* This code implements Queued, asynchronous I/O
* under Micro RTX (tm) for those operating
* system hackers that like this style of
* I/O processing. QIO comes from a famous
* operating system which shall remain nameless.
*
* It demonstrates the types of facilities that
* can be implemented on top of RTX, and is
* really just a skeleton for a more complete
* implementation.
*
* The mechanism works like this: I/O requests
* are queued on a QIO channel, and then serviced
* in FIFO order by an asychronous server process.
* Once the I/O request is queued, the requesting
* process may continue with other processing.
*
* Internally, the implementation does it by creating
* a server process for each QIO channel, along with
* two RTX message queues. When a process queues an
* I/O request, it builds an I/O request packet, and
* sends it over an RTX Queue to the appropriate
* server process. When the process wishes to
* synchonize with the completion of the I/O request,
* it simply waits for a message back from the
* server.
*
* The server process and RTX queues ares created
* dynamically as QIO channels are needed. Likewise,
* they are deleted when a QIO channel is closed.
*
* I've always thought the QIO scheme was a screwy way
* to do things. But you asked for it.
*
*/
/*
* function to get an I/O request Packet from a message queue
*/
struct qio_request *get_iorp(msgq)
char *msgq;
{
struct qio_msg msg;
/* get next queued I/O request (wait forever) */
if (q_req(msgq, &msg, 0, 0L))
return(0); /* failure */
/* validate iorp message */
if (msg.iorp_valid != IORP_VALID)
return(0); /* failure */
return(msg.iorp);
}
/*
* Here is the server process. One incarnation of this process
* exists for each QIO channel, therefore it must be reentrant.
*/
static server(channel)
struct qio_channel *channel; /* channel */
{
struct qio_request *iorp;
int operation;
struct qio_msg msg;
/* loop until DELETE message received */
do {
/* get next queued I/O request */
if (q_req(channel->serverq, &msg, 0, 0L))
continue;
/* validate iorp message */
if (msg.iorp_valid != IORP_VALID)
continue;
iorp = msg.iorp;
/* perform requested operation */
switch (operation = iorp->operation) {
case QIO_FOPEN:
iorp->retcode = Fopen(iorp->buffer, iorp->handle);
break;
case QIO_FCLOSE:
iorp->retcode = Fclose(iorp->handle);
break;
case QIO_FREAD:
iorp->retcode = Fread(iorp->handle,
iorp->count, iorp->buffer);
break;
case QIO_FWRITE:
iorp->retcode = Fwrite(iorp->handle,
iorp->count, iorp->buffer);
break;
case QIO_DELETE:
/* terminate */
iorp->retcode = 0;
break;
/* add your own options here */
default:
/* Invalid Function */
iorp->retcode = EINVFN;
break;
}
/* send the IO request back to requestor */
q_send(channel->replyq, &msg);
} while (operation != QIO_DELETE);
p_delete((char *)0);
}
/*
* This function "opens" a QIO channel by creating a server process
* and the associated message queues.
*/
struct qio_channel *qio_open(name)
char *name;
{
struct qio_channel *channel; /* stack channel for p_create */
int dummy; /* dummy integer */
char namebuf[16]; /* name buffer for q_create */
/* get a buffer for the "channel" data structure */
if ((channel = m_alloc((long)sizeof(struct qio_channel))) != 0) {
/* create RTX message Q for QIO serverq */
strcpy(namebuf, "SQ_");
strcat(namebuf, name);
if ((channel->serverq = q_create(namebuf, 0)) == 0) {
m_free(channel);
return(0); /* fail */
}
/* create RTX message Q for QIO replyq */
strcpy(namebuf, "RQ_");
strcat(namebuf, name);
if ((channel->replyq = q_create(namebuf, 0)) == 0) {
q_delete(channel->serverq);
m_free(channel);
return(0); /* fail */
}
/* spawn a copy of the server process */
if (p_create(name,
p_priority(0, (char *)0),
p_slice(0, (char *)0),
server,
sizeof(int *),
&channel,
4096L) == 0) { /* failure if couldn't create */
q_delete(channel->serverq);
q_delete(channel->replyq);
m_free(channel);
return(0); /* fail */
}
}
return(channel); /* return channel address to caller */
}
/*
* This function queues an I/O request. It builds an I/O Request Packet
* and sends it to the server.
*
* The parameters passed in the I/O Request Packet (IORP) are sent
* to the server untouched, so they really cound be used for anything.
* The server validates the parameters, and caries out the specified
* operation. This function simply sends the request to the server.
*/
qio_start(channel, operation, handle, buffer, count)
struct qio_channel *channel;
int operation;
int handle;
char *buffer;
long count;
{
struct qio_request *iorp;
struct qio_msg msg;
/* build the IO request packet */
if ((iorp = m_alloc((long)sizeof(struct qio_request))) == 0) {
return(-1); /* failure */
}
/* load the QIO request parameters */
iorp->operation = operation; /* opcode */
iorp->handle = handle; /* handler parameter */
iorp->buffer = buffer; /* buffer address */
iorp->count = count; /* count */
msg.iorp = iorp; /* message addresses IORP */
msg.iorp_valid = IORP_VALID; /* Indicate message type */
/* send the IO request to the server */
if (q_send(channel->serverq, &msg)) {
m_free(iorp);
return(-1); /* fail */
}
return(0); /* success */
}
/*
* This function returns the completion of the specified I/O request.
* The value returned corresponds to the valid return values for the
* given class of operation.
*
* This is how the requesting process synchronizes with the queued
* I/O operation. Replys come back in the same order that they were
* send out via the qio_start() service. If you are expecting a
* reply out of order, you'll have a long wait (forever).
*/
long qio_stop(channel)
struct qio_channel *channel;
{
struct qio_request *iorp;
long retcode;
/* wait for next I/O Completion */
if ((iorp = get_iorp(channel->replyq)) == 0)
return(-1); /* failure */
retcode = iorp->retcode; /* get return code */
m_free(iorp); /* free the IO packet */
return(retcode); /* return return code */
}
/*
* This function "closes" a QIO channel by sending a "DELETE"
* message to the server, and waiting for the reply. This is
* necessary to rundown any pending I/O operations. When
* all outstanding I/O is complete, the message queue and
* associated data structures are deleted.
*/
qio_close(channel)
struct qio_channel *channel;
{
struct qio_request *iorp;
struct qio_msg msg;
if ((iorp = m_alloc((long)sizeof(struct qio_request))) == 0) {
return(0);
}
iorp->operation = QIO_DELETE;
msg.iorp = iorp;
msg.iorp_valid = IORP_VALID;
if (q_send(channel->serverq, &msg)) {
m_free(iorp);
return(-1); /* failure */
}
/* Wait for all I/O completions */
do {
if ((iorp = get_iorp(channel->replyq)) == 0)
break;
} while (iorp->operation != QIO_DELETE);
/* delete the message queues */
q_delete(channel->serverq);
q_delete(channel->replyq);
/* free the "channel" data structure */
m_free(channel);
return(0);
}
/**************** END OF QIO LIBRARY **************************************/
/**************************************************************************
*
* The following is just sample code to show how the above functions
* may be used in your programs.
*
**************************************************************************/
#define BUFSIZE 1024
char buf1[BUFSIZE]; /* buffer for reading */
extern char *_base; /* GEMDOS basepage */
/*
* start_read - start a QIO read
*/
start_read(chan, han, buf)
struct qio_channel *chan;
int han;
char *buf;
{
int i;
long retcode;
if (qio_start(chan, QIO_FREAD, han, buf, (long)BUFSIZE)) {
fprintf(stderr, "failed qio_start\n");
exit(1);
}
}
/*
* open_file - open a file using the QIO services
*/
open_file(chan, name)
struct qio_channel *chan;
char *name;
{
int i;
long retcode;
if (qio_start(chan, QIO_FOPEN, 0, name, 0L)) {
fprintf(stderr, "open_file: failed qio_start\n");
exit(1);
}
retcode = qio_stop(chan);
printf("open_file: I/O Complete, retcode=%ld\n", retcode);
return((int)retcode);
}
/*
* close_file - close file using QIO services
*/
close_file(chan, han)
struct qio_channel *chan;
int han;
{
int i;
long retcode;
if (qio_start(chan, QIO_FCLOSE, han, (char *)0, 0L)) {
fprintf(stderr, "close_file: failed qio_start\n");
exit(1);
}
retcode = qio_stop(chan);
printf("close_file: I/O Complete, retcode=%ld\n", retcode);
return((int)retcode);
}
/*
* The main routine.
*/
main(argc, argv)
int argc;
char *argv[];
{
struct qio_channel *chan1;
int han1;
struct config config;
/* configuratioin table */
config.basepage = _base;
config.max_proc = 32;
config.max_msgs = 200;
config.max_queues = 10;
config.create_call = 0L;
config.delete_call = 0L;
config.switch_call = 0L;
/* install the MICRO RTX kernel */
rtx_install(&config);
/* always reads a specified file */
if (argc != 2) {
printf("usage: qiotest file\n");
exit(1);
}
/* open a QIO channel */
if ((chan1 = qio_open("Chan1")) == 0) {
fprintf(stderr, "failed qio_open\n");
exit(1);
}
printf("QIO channel %08lx open\n", chan1);
/* open the specified file */
han1 = open_file(chan1, argv[1]);
/* loop reading blocks from the file */
do {
/* Queue the I/O request */
start_read(chan1, han1, buf1);
/* Do other processing, asynchronously */
printf("I/O Operation Queued. Doing some other stuff now.\n");
} while (qio_stop(chan1) > 0); /* synchronize */
/* close the file */
close_file(chan1, han1);
/* close the QIO channel */
qio_close(chan1);
printf("QIO channel %08lx closed\n", chan1);
/* All done */
rtx_remove();
}